package com.example.demo.stream.sink;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.demo.env.SpringContextHolder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * 写出到mySQL
 *
 * @author: wbx
 */
public class MySQLSink<In> extends RichSinkFunction<In> {

    /**
     * 调用的mapper接口对象
     */
    private Class<? extends BaseMapper<In>> mapperClass;

    private BaseMapper<In> mapper;


    public MySQLSink(Class<? extends BaseMapper<In>> mapperClass) {
        this.mapperClass = mapperClass;
    }


    @Override
    public void open(Configuration parameters) {
        // 获取上下文中的 mapper对象
        mapper = SpringContextHolder.getBean(mapperClass);
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public void invoke(In value, Context context) {
        // 把数据写入到DB 调用传入mapper的 insert方法
        int insert = mapper.insert(value);
        System.out.println("受影响的行数：" + insert);
    }
}
